package com.yxsk.relay.job.component.admin.registry;

import com.yxsk.relay.job.component.admin.exception.registry.RelayJobRegistryException;
import com.yxsk.relay.job.component.admin.registry.callback.SlaveChangeCallback;
import com.yxsk.relay.job.component.admin.registry.security.SecurityChecker;
import com.yxsk.relay.job.component.common.thread.WakeupAbleTask;
import com.yxsk.relay.job.component.common.utils.CollectionUtils;
import com.yxsk.relay.job.component.common.utils.DateUtils;
import com.yxsk.relay.job.component.common.vo.SlaveInfo;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;

import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;

/**
 * @Author 11376
 * @CreaTime 2019/6/8 23:52
 * @Description 服务管理器
 */
@Slf4j
@NoArgsConstructor
public class Steward extends WakeupAbleTask {

    public Steward(StewardConfig config) {
        this.stewardConfig = config;
    }

    private List<SlaveChangeCallback> callbackList;

    // 注册安全检查器
    private ReadWriteLock checkerLock = new ReentrantReadWriteLock(false);
    private List<SecurityChecker> checkers = new ArrayList<>(5);

    public void setStewardConfig(StewardConfig stewardConfig) {
        this.stewardConfig = stewardConfig;
    }

    private StewardConfig stewardConfig;

    private boolean stop = false;

    // 注册列表, key: appName, value: slave list
    private Map<String, List<SlaveInfo>> appSlaves;

    // 契约有效期列表 key: host:port, value: slave
    private Map<String, SlaveContract> slaveContracts;

    // 异常下线 slave, key: host:port, value: slave
    private Map<String, SlaveContract> anomalyDownSlaves;

    private ReadWriteLock lock;

    private Thread workThread;

    public void init() {
        if (this.stewardConfig == null) {
            this.stewardConfig = new StewardConfig();
        }
        this.appSlaves = new HashMap<>();
        this.slaveContracts = new LinkedHashMap<>();
        if (this.stewardConfig.getRecordAnomalySlave()) {
            this.anomalyDownSlaves = new LinkedHashMap<>();
        } else {
            this.anomalyDownSlaves = Collections.EMPTY_MAP;
        }
        this.callbackList = Collections.synchronizedList(new ArrayList<>());
        this.lock = new ReentrantReadWriteLock(false);

        if (log.isDebugEnabled()) {
            log.debug("Relay-Job-Steward初始化完成");
        }
    }

    public void shutdown() {
        this.stop = true;
        this.wakeup();
        try {
            this.workThread.join();
        } catch (InterruptedException e) {
            // Noting.
        }
        if (log.isInfoEnabled()) {
            log.info("Steward task shutdown.");
        }
    }

    /**
     * @param callback
     * @Author 11376
     * @Description
     * @CreateTime 2019/6/15 11:52
     * @Return
     */
    public void addChangeCallback(SlaveChangeCallback callback) {
        this.callbackList.add(callback);
        if (log.isDebugEnabled()) {
            log.debug("Add slave change call back success. Callback class[{}]", callback.getClass());
        }
    }

    public void addSecurityChecker(SecurityChecker checker) {
        this.checkerLock.writeLock().lock();
        try {
            this.checkers.add(checker);
        } finally {
            this.checkerLock.writeLock().unlock();
        }
    }

    public void removeChangeCallback(SlaveChangeCallback callback) {
        this.callbackList.remove(callback);
        if (log.isDebugEnabled()) {
            log.debug("Remove slave change call back success. Callback class[{}]", callback.getClass());
        }
    }

    @Override
    public Object doWork() {
        this.workThread = Thread.currentThread();
        if (log.isDebugEnabled()) {
            log.debug("Relay-Job-Steward已启动");
        }
        while (!stop) {
            try {
                // 删除表头元素
                Date currentTime = getCurrentTime();
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Fire timeout slaves...", currentTime);
                    log.debug("Current number of slaves {}", this.slaveContracts.size());
                }
                // 下一次检查间隔时间
                long nextTime = 0L;
                boolean lastE = false;
                List<SlaveInfo> slaves = new ArrayList<>();
                // 游标元素
                SlaveContract cursor = null;
                this.lock.readLock().lock();
                try {
                    Set<Map.Entry<String, SlaveContract>> entries = this.slaveContracts.entrySet();
                    // 每次检查 5 个元素
                    Iterator<Map.Entry<String, SlaveContract>> iterator = entries.iterator();
                    for (int i = 0, j = entries.size() < 5 ? entries.size() : 5; i < j; i++) {
                        cursor = iterator.next().getValue();
                        if (cursor.expirationTime.compareTo(currentTime) > 0) {
                            // 元素时间大于当前时间, break, 等待下一次
                            break;
                        }
                        SlaveContract contract = this.slaveContracts.get(getKey(cursor.slave.getEndpoint().getHost(), cursor.slave.getEndpoint().getPort()));
                        slaves.add(contract.slave.clone());
                        if (!iterator.hasNext()) {
                            lastE = true;
                        }
                    }
                } finally {
                    this.lock.readLock().unlock();
                }

                // 被动解约 slave
                if (!CollectionUtils.isEmpty(slaves)) {
                    slaves.stream().forEach(slaveInfo -> this.terminate(slaveInfo, true));
                }

                if (log.isDebugEnabled()) {
                    log.debug("Fired number of slaves {}", slaves.size());
                }

                // 清除被动解约超时元素
                this.checkAnomalyDownSlaves();

                if (lastE || cursor == null) {
                    // 已到达最后一个元素
                    nextTime = this.stewardConfig.getExpirationTime();
                } else if (cursor != null && cursor.expirationTime.compareTo(currentTime) > 0) {
                    // 游标元素时间小于等于当前时间
                    nextTime = subtract(cursor.expirationTime, currentTime);
                }

                nextTime = nextTime < 0 ? 0 : nextTime;

                if (log.isDebugEnabled()) {
                    log.debug("Steward sleep {} millisecond...", nextTime);
                }

                if (!this.workThread.isInterrupted()) {
                    try {
                        Thread.sleep(nextTime);
                    } catch (InterruptedException e) {
                        log.warn("Steward thread interrupted.");
                    }
                }
            } catch (Exception e) {
                log.error("Steward error.", e);
                if (!this.workThread.isInterrupted()) {
                    long errorSleepTime = this.stewardConfig.getExpirationTime() / 3;
                    long threeSeconds = TimeUnit.MILLISECONDS.convert(3, TimeUnit.SECONDS);
                    try {
                        Thread.sleep(errorSleepTime <= threeSeconds ? threeSeconds : errorSleepTime);
                    } catch (InterruptedException e1) {
                        // Nothing.
                    }
                }
            }
        }
        return "steward shutdown.";
    }

    private void checkAnomalyDownSlaves() {
        if (log.isDebugEnabled()) {
            log.debug("Remove anomaly down slaves, number of elements[{}]", this.anomalyDownSlaves.size());
        }
        // 被动解约列表只有扫描线程修改，所以不用加锁
        if (!CollectionUtils.isEmpty(this.anomalyDownSlaves)) {
            synchronized (this.anomalyDownSlaves) {
                Date currentTime = this.getCurrentTime();
                Iterator<Map.Entry<String, SlaveContract>> iterator = this.anomalyDownSlaves.entrySet().iterator();
                while (iterator.hasNext()) {
                    SlaveContract contract = iterator.next().getValue();
                    if (contract.expirationTime.compareTo(currentTime) <= 0) {
                        // 超时
                        iterator.remove();
                        if (log.isDebugEnabled()) {
                            log.debug("Remove anomaly down slave[{}]", contract);
                        }
                    } else {
                        break;
                    }
                }
            }
        }
        if (log.isDebugEnabled()) {
            log.debug("Remove anomaly down slaves complete.");
        }
    }

    /**
     * @param slaveInfo
     * @param token
     * @Author 11376
     * @Description 安全检查
     * @CreateTime 2019/6/15 10:46
     * @Return
     */
    private void securityCheck(SlaveInfo slaveInfo, String token) throws RelayJobRegistryException {
        this.checkerLock.readLock().lock();
        try {
            Iterator<SecurityChecker> iterator = this.checkers.iterator();
            while (iterator.hasNext()) {
                SecurityChecker checker = iterator.next();
                // 是否需要安全检查
                if (checker.isCheck(slaveInfo, token)) {
                    // 安全检查
                    checker.check(slaveInfo, token);
                }
            }
        } finally {
            this.checkerLock.readLock().unlock();
        }
    }

    /**
     * @param slaveInfo
     * @Author 11376
     * @Description 服务续期
     * @CreateTime 2019/6/9 11:35
     * @Return 契约有效时间
     */
    public Date renewal(SlaveInfo slaveInfo, String token) throws RelayJobRegistryException {
        if (log.isDebugEnabled()) {
            log.debug("renewal slave[{}], token[{}]", slaveInfo, token);
        }
        this.securityCheck(slaveInfo, token);
        this.lock.writeLock().lock();
        try {
            // 判断是否已存在契约
            String key = getKey(slaveInfo.getEndpoint().getHost(), slaveInfo.getEndpoint().getPort());
            SlaveContract contract = this.slaveContracts.get(key);

            if (contract != null) {
                // 续约
                // 移除契约列表
                this.slaveContracts.remove(key);
                // 移除应用节点列表
                this.appSlaves.get(slaveInfo.getAppName()).remove(contract.slave);
                this.addContract(slaveInfo);
                if (log.isDebugEnabled()) {
                    log.debug("Renewal slave success");
                }
            } else {
                // 新注册
                this.handshake(slaveInfo);
                if (log.isDebugEnabled()) {
                    log.debug("Registry slave success");
                }
            }

            return this.slaveContracts.get(key).expirationTime;
        } finally {
            this.lock.writeLock().unlock();
        }
    }

    /**
     * @param slaveInfo
     * @Author 11376
     * @Description 服务主动下线
     * @CreateTime 2019/6/9 11:43
     * @Return
     */
    public void terminate(SlaveInfo slaveInfo, String token) throws RelayJobRegistryException {
        if (log.isDebugEnabled()) {
            log.debug("Slave offline, host[{}], token[{}]", slaveInfo.getEndpoint().getHost(), token);
        }

        this.lock.writeLock().lock();
        try {
            List<SlaveInfo> list = this.appSlaves.get(slaveInfo.getAppName());
            if (CollectionUtils.isEmpty(list)) {
                // 未找到注册信息
                return;
            }

            List<SlaveInfo> slaveInfos = list.stream().filter(info -> info.getEndpoint().equalsInstance(slaveInfo.getEndpoint())).collect(Collectors.toList());

            if (CollectionUtils.isNotEmpty(slaveInfos)) {
                Iterator<SlaveInfo> iterator = slaveInfos.iterator();
                while (iterator.hasNext()) {
                    SlaveInfo info = iterator.next();
                    this.securityCheck(info, token);
                    this.terminate(info, false);
                }
            }
        } finally {
            this.lock.writeLock().unlock();
        }

    }

    private void terminate(SlaveInfo slaveInfo, boolean abnormal) {
        this.fired(slaveInfo.getEndpoint().getHost(), slaveInfo.getEndpoint().getPort(), abnormal);
        if (log.isDebugEnabled()) {
            log.debug("terminate slave[{}]", slaveInfo);
        }
    }

    /**
     * @param appName
     * @Author 11376
     * @Description 获取注册信息
     * @CreateTime 2019/6/9 11:53
     * @Return
     */
    public List<SlaveContract> getSlaveContracts(String appName) {
        this.lock.readLock().lock();
        try {
            List<SlaveContract> result = new ArrayList<>();
            List<SlaveInfo> slaves = this.appSlaves.get(appName);
            if (!CollectionUtils.isEmpty(slaves)) {
                slaves.stream().forEach(slaveInfo -> result.add(this.slaveContracts.get(getKey(slaveInfo.getEndpoint().getHost(), slaveInfo.getEndpoint().getPort())).clone()));
            }
            return result;
        } finally {
            this.lock.readLock().unlock();
        }
    }

    public List<SlaveInfo> getAnomalyDownSlaves() {
        this.lock.readLock().lock();
        try {
            List<SlaveInfo> result = new ArrayList<>();
            synchronized (this.anomalyDownSlaves) {
                this.anomalyDownSlaves.entrySet().stream().forEach(entry -> result.add(entry.getValue().slave.clone()));
            }
            return result;
        } finally {
            this.lock.readLock().unlock();
        }
    }

    /**
     * @param host
     * @param abnormal
     * @Author 11376
     * @Description 剔除服务
     * @CreateTime 2019/6/9 11:25
     * @Return
     */
    private void fired(String host, Integer port, boolean abnormal) {
        String key = getKey(host, port);
        SlaveContract contract = this.slaveContracts.get(key);
        if (contract != null) {
            // 移除契约列表
            this.slaveContracts.remove(key);
            // 移除注册列表
            SlaveInfo slave = contract.slave;
            this.appSlaves.get(slave.getAppName()).remove(slave);
            if (abnormal && this.stewardConfig.getRecordAnomalySlave()) {
                SlaveContract clone = contract.clone();
                clone.expirationTime = this.addMillisecond(getCurrentTime(), Math.toIntExact(this.stewardConfig.getAnomalyKeepTime()));
                this.anomalyDownSlaves.put(getKey(host, port), clone);
            }
            this.callbackList.stream().forEach(callback -> {
                try {
                    callback.offline(slave.clone());
                } catch (RuntimeException e) {
                    log.error("Callback error when slave offline, Callback class[" + callback.getClass() + "]", e);
                }
            });
        }
    }

    private String getKey(String host, Integer port) {
        return host.concat(":").concat(port.toString());
    }

    /**
     * @param slaveInfo
     * @Author 11376
     * @Description 添加服务
     * @CreateTime 2019/6/9 11:25
     * @Return
     */
    private void handshake(SlaveInfo slaveInfo) {
        addContract(slaveInfo);

        this.callbackList.stream().forEach(callback -> {
            try {
                callback.online(slaveInfo.clone());
            } catch (RuntimeException e) {
                log.error("Callback error when slave online, Callback class[" + callback.getClass() + "]", e);
            }
        });
    }

    private void addContract(SlaveInfo slaveInfo) {
        SlaveContract contract = new SlaveContract(addMillisecond(getCurrentTime(), Math.toIntExact(this.stewardConfig.getExpirationTime())), slaveInfo);
        List<SlaveInfo> slaves = this.appSlaves.get(slaveInfo.getAppName());
        if (slaves == null) {
            slaves = new ArrayList<>();
        }
        slaves.add(slaveInfo);
        // 添加注册列表
        this.appSlaves.put(slaveInfo.getAppName(), slaves);
        // 添加契约信息
        this.slaveContracts.put(getKey(slaveInfo.getEndpoint().getHost(), slaveInfo.getEndpoint().getPort()), contract);
    }

    private Date getCurrentTime() {
        return DateUtils.getCurrentDate();
    }

    private Date addMillisecond(Date time, int millisecond) {
        Calendar calendar = Calendar.getInstance();
        calendar.setTime(time);
        calendar.add(Calendar.MILLISECOND, millisecond);
        return calendar.getTime();
    }

    private Date subtractMillisecond(Date time, int millisecond) {
        Calendar calendar = Calendar.getInstance();
        calendar.setTime(time);
        calendar.add(Calendar.MILLISECOND, 0 - millisecond);
        return calendar.getTime();
    }

    private long subtract(Date d1, Date d2) {
        Calendar calendar1 = Calendar.getInstance();
        calendar1.setTime(d1);
        Calendar calendar2 = Calendar.getInstance();
        calendar2.setTime(d2);

        return calendar1.getTimeInMillis() - calendar2.getTimeInMillis();
    }

    @ToString
    @Getter
    @AllArgsConstructor
    @NoArgsConstructor
    public static class SlaveContract implements Cloneable {
        // 到期时间
        Date expirationTime;
        // slave
        SlaveInfo slave;

        @Override
        protected SlaveContract clone() {
            return new SlaveContract(new Date(expirationTime.getTime()), slave);
        }
    }

}
